package com.ihome.framework.core.lock;

import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;

import com.alibaba.dubbo.common.utils.ConcurrentHashSet;
import com.alibaba.dubbo.common.utils.NetUtils;
import com.ihome.framework.core.cache.nkv.IHomeNkvClient;
import com.ihome.framework.core.cache.nkv.NkvOperateException;
import com.ihome.framework.core.utils.JsonUtil;
import com.netease.backend.nkv.client.NkvClient.NkvOption;
import com.netease.backend.nkv.client.Result;
import com.netease.backend.nkv.client.Result.ResultCode;
import com.netease.backend.nkv.client.error.NkvFlowLimit;
import com.netease.backend.nkv.client.error.NkvRpcError;
import com.netease.backend.nkv.client.error.NkvTimeout;

public class NkvDistributedLockImpl implements IHomeDistributedLock {

    private Logger LOG = LoggerFactory.getLogger(NkvDistributedLockImpl.class);


    private Set<Long> ACTIVE_THREAD_HASH_LIST = new ConcurrentHashSet<Long>();

    final static long DEFAULT_EXP = 10; // s

    private final String lockName;
    private volatile Long lockValue;
    private final long exp;
    private final int activesMax;
    private final IHomeNkvClient nkvClient;

    private final String PING = "ping";
    private final String STATE = "state";
    private final String ACTIVE = "active";
    private final Integer PING_EXP = 10*60; //10m
    private final short FORCE_UPDATE = 0;

    private final int LOCK_TIME_INDEX = 0;
    private final int LOCK_THREAD_HASH_INDEX = 1;
    private final int LOCK_LOCAL_IP_INDEX = 2;

    private final NkvOption DEFAULT_OPT;


    public NkvDistributedLockImpl(String lockName, IHomeNkvClient nkvClient) {
        this(lockName, nkvClient,DEFAULT_EXP, TimeUnit.SECONDS);
    }


    public NkvDistributedLockImpl(String lockName, IHomeNkvClient nkvClient, long exp, TimeUnit unit) {
        this(lockName, nkvClient,1,DEFAULT_EXP, TimeUnit.SECONDS);
    }

    public NkvDistributedLockImpl(String lockName, IHomeNkvClient nkvClient,int activesMax, long exp, TimeUnit unit) {
        this.exp = TimeUnit.SECONDS.convert(exp == 0L ? DEFAULT_EXP : exp, unit == null? TimeUnit.SECONDS : unit);
        Assert.hasLength(lockName);
        this.lockName = lockName;
        Assert.notNull(nkvClient);
        this.nkvClient = nkvClient;
        this.activesMax = activesMax <= 0 ? 1 : activesMax;
        DEFAULT_OPT = new NkvOption(nkvClient.getTimeout(),FORCE_UPDATE,PING_EXP);
    }

    @Override
    public void lock() throws DistributedLockException {
        if(nkvClient != null && ping()){
            synchronized (nkvClient) {
                try {
                    long now = System.currentTimeMillis();
                    Object[] values = new Object[]{now,currentThreadHashCode(),NetUtils.getLocalHost()};
                    nkvClient.setIfNotExist(lockName, values, (int) exp);
                    LOG.debug(String.format("lock name[%s]|values[%s]", lockName(),JsonUtil.toJSONString(values)));
                    lockValue = now;
                } catch (NkvOperateException e) {
                    if (e.getResultCode() == ResultCode.EXISTS) {
                        Long threadHashCode = lockInfo(LOCK_THREAD_HASH_INDEX);
                        if(check()){
                            if(threadHashCode != null && currentThreadHashCode() != threadHashCode)
                                throw new DistributedLockException();
                        }
                    } else
                        throw e;
                } 
                reentrantIncr(); // Success Lock Incr State 
            }
        }
    }


    protected int reentrantIncr(){
        // 持有锁计数+1
        try{
            if(nkvClient != null && ping()){
                Result<Integer> result = nkvClient.getNkvClient().incr(nkvClient.getNamespace(), nkvClient.serializer(new StringBuffer(
                        lockName()).append("-").append(currentThreadHashCode()).append("-").append(STATE).toString()), 1, 0, DEFAULT_OPT);

                LOG.debug(String.format("incr success[%s]... current reentrant state[%s]|lockName[%s]", result.isSuccess(),result.getResult(),lockName())); 

                if(result.isSuccess()){
                    return result.getResult();
                }
            }
        }catch (Exception e) {
            LOG.error(e.getMessage(),e);
        }
        return 0;
    }

    protected int reentrantDecr(){
        // 持有锁计数-1
        try{
            if(nkvClient != null && ping()){
                Result<Integer> result = nkvClient.getNkvClient().decr(nkvClient.getNamespace(), nkvClient.serializer(new StringBuffer(
                        lockName()).append("-").append(currentThreadHashCode()).append("-").append(STATE).toString()), 1, 0, DEFAULT_OPT);
                LOG.debug(String.format("decr success[%s]... current reentrant state[%s]|lockName[%s]", result.isSuccess(),result.getResult(),lockName())); 
                if(result.isSuccess()){
                    return result.getResult();
                }
            }
        }catch (Exception e) {
            LOG.error(e.getMessage(),e);
        }
        return 0;
    }

    protected <T> T lockInfo(int index){
        try{
            Object object = nkvClient.get(lockName);
            if(object != null && !(object instanceof String)){
                Object[] log = (Object[]) object;
                return (T) log[index];
            }
        }catch (Exception e) {
            LOG.error(e.getMessage(),e);
        }
        return null;
    }


    private long currentThreadHashCode(){

        long hashCode = ((long) (Thread.currentThread().getName().hashCode() & 0xFF) << 24)
                | ((long) (Thread.currentThread().hashCode() & 0xFF) << 16)
                | ((long) (Thread.currentThread().getId() & 0xFF) << 8)
                | (Integer.valueOf(Thread.currentThread().hashCode()).byteValue() & 0xFF);
        return hashCode;
    }

    /* (non-Javadoc)
     * @see com.ihome.framework.core.lock.IHomeDistributedLock#ping()
     */
    @Override
    public boolean ping() {
        try {
            if(nkvClient != null){
                nkvClient.getNkvClient().incr(nkvClient.getNamespace(), nkvClient.serializer(PING), 1, 1, DEFAULT_OPT);
                return true;
            }
        } catch (Exception e) { }
        LOG.warn("nkv client is close.");
        return false;
    }


    /* (non-Javadoc)
     * @see com.ihome.framework.core.lock.IHomeDistributedLock#check()
     */
    @Override
    public boolean check() {
        Long tempValue = lockInfo(LOCK_TIME_INDEX);
        if(nkvClient != null && isLock() && tempValue != null){
            this.lockValue = tempValue;
            long now = System.currentTimeMillis();
            long useTime = now - lockValue;
            long exp = TimeUnit.MILLISECONDS.convert(this.exp, TimeUnit.SECONDS);
            if(useTime >= exp){
                // lock time out 
                LOG.info(String.format("Check Lock Time Out[%s]|start:[%s] - now:[%s]... ", lockName(),lockValue,now));
                nkvClient.delete(lockName); // remove lock key
                return false;
            }
            return true;
        }
        return false;
    }


    @Override
    public boolean tryLock(int timeout, TimeUnit unit) {
        boolean flag = false;
        long unitTime = 0L;

        if(timeout <= 0){
            unitTime = Long.MAX_VALUE;
            LOG.warn(String.format("lock[%s] wait timeout no limit",lockName()));
        }else{
            unitTime = TimeUnit.MILLISECONDS.convert(timeout, unit);
        }

        Long time = System.currentTimeMillis(); // start time
        for (;;) {
            try {
                lock();
                flag = true;
                break;
            } catch (DistributedLockException e) {
                Long now = System.currentTimeMillis(); // current time
                if ((now - time) >= unitTime) {
                    break;
                }
                try {
                    LOG.debug(String.format("DistributedLock|LockName:[%s]...", lockName));
                    Thread.sleep(unitTime < 300L ? unitTime : 300L);
                } catch (InterruptedException e1) {}
            } catch (Exception e) {
                throw e;
            }
        }
        return flag;
    }

    /* (non-Javadoc)
     * @see com.ihome.framework.core.lock.IHomeDistributedLock#tryActivesLock(int, java.util.concurrent.TimeUnit)
     */
    @Override
    public boolean tryActivesLock(int timeout, TimeUnit unit) throws Exception {
        if(nkvClient != null && ping()){

            long unitTime = 0L;
            if(timeout <= 0){
                unitTime = Long.MAX_VALUE;
                LOG.warn(String.format("lock[%s] wait timeout no limit",lockName()));
            }else{
                unitTime =TimeUnit.MILLISECONDS.convert(timeout, unit);
            }
            int active = currentActivesCount();
            if(active >= activesMax-1){
                LOG.info("concurrent lock wait...");
                Long time = System.currentTimeMillis(); // start time
                for(;;){
                    active = currentActivesCount();
                    if(active < activesMax){
                        incrActives();
                        return true; 
                    }
                    Long now = System.currentTimeMillis(); // current time
                    if ((now - time) >= unitTime) {
                        // time out 
                        break;
                    }
                    try {
                        LOG.debug(String.format("DistributedActivesLock|LockName:[%s]...", lockName));
                        Thread.sleep(unitTime < 300L ? unitTime : 300L);
                    } catch (InterruptedException e1) {}
                }
            }else{
                incrActives();
                return true;
            }
        }
        LOG.info("concurrent lock pass...");
        return false;
    }
    
    
    private void incrActives() throws NkvRpcError, NkvFlowLimit, NkvTimeout, InterruptedException{
        ACTIVE_THREAD_HASH_LIST.add(currentThreadHashCode());
        Result<Integer> result = nkvClient.getNkvClient().incr(nkvClient.getNamespace(),nkvClient.serializer(buildLockActivesName()), 1, 0, DEFAULT_OPT);
        Assert.isTrue(result.isSuccess(),"nkv incr error!");
        LOG.debug(String.format("concurrent lock[%s]|actives[%s]",lockName(),activesMax));
    }


    protected int currentActivesCount() throws Exception{
        synchronized (nkvClient) {
            int  active = nkvClient.getNkvClient().incr(nkvClient.getNamespace(),nkvClient.serializer(buildLockActivesName()), 1, 0, DEFAULT_OPT).getResult() - 1;
            nkvClient.getNkvClient().decr(nkvClient.getNamespace(),nkvClient.serializer(buildLockActivesName()), 1, 0, DEFAULT_OPT);
            return active;
        }
    }

    /* (non-Javadoc)
     * @see com.ihome.framework.core.lock.IHomeDistributedLock#activesUnlock()
     */
    @Override
    public boolean activesUnlock() throws Exception {
        if(nkvClient != null && ping()){
            if( nkvClient.get(buildLockActivesName()) != null){
                try{
                    Result<Integer> result = nkvClient.getNkvClient().decr(nkvClient.getNamespace(),nkvClient.serializer(buildLockActivesName()), 1, 1, DEFAULT_OPT);
                    Assert.isTrue(result.isSuccess(),"nkv decr error!");
                    LOG.debug(String.format("concurrent unlock[%s]|current[%s]|actives[%s]",lockName(),result.getResult(),activesMax));
                    if(result.getResult() < activesMax){
                        if(result.getResult() <= 0){
                            LOG.debug("del actives size...");
                        }
                        return true;
                    }
                    return false;
                }finally {
                    ACTIVE_THREAD_HASH_LIST.remove(currentThreadHashCode());
                }
            }
        }
        return false; 
    }

    protected String buildLockActivesName() {
        return new StringBuffer(lockName()).append("-").append(ACTIVE).toString();
    }

    @Override
    public boolean unlock() {
        if(nkvClient != null ){
            synchronized (nkvClient) {
                if (nkvClient.get(lockName) != null) {
                    if(reentrantDecr() <= 0){
                        LOG.info(new StringBuffer(lockName()).append(" unlock... ").toString());
                        nkvClient.delete(lockName);
                        return true;
                    }
                }
            }
        }
        return false;
    }

    @Override
    public boolean isLock() {
        if(nkvClient != null && ping()){
            synchronized (nkvClient) {
                return nkvClient.get(lockName) != null;
            }
        }
        return false;
    }

    /* (non-Javadoc)
     * @see com.ihome.framework.core.lock.IHomeDistributedLock#lockName()
     */
    @Override
    public String lockName() {
        return lockName;
    }


    /* (non-Javadoc)
     * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
     */
    @Override
    public void afterPropertiesSet() throws Exception {
    }

    /**
     * @return the ACTIVE_THREAD_HASH_LIST
     */
    public Set<Long> getActiveThreadHashList() {
        return ACTIVE_THREAD_HASH_LIST;
    }


    /* (non-Javadoc)
     * @see org.springframework.beans.factory.DisposableBean#destroy()
     */
    @Override
    public void destroy() throws Exception {
        try{
            String local = lockInfo(LOCK_LOCAL_IP_INDEX); 
            if(isLock() && StringUtils.isNotBlank(local) && local.equals(NetUtils.getLocalHost())){
                unlock();
            }
        }finally {
            nkvClient.delete(buildLockActivesName());
            ACTIVE_THREAD_HASH_LIST.clear();
            ACTIVE_THREAD_HASH_LIST = null;
        }
    }

}
